<?php
namespace app\api\controller;

// use app\common\model\RoymqMessageModel;
use app\common\mq\CMQ;
use app\common\mq\PulsarMQ;
use app\common\mq\Roymq;
use think\cache\driver\Redis;

class Mq extends Common
{
    //region CMQ 发送、接收消息
    public function sendMessage()
    {
        $queue_name = $this->param['queue_name'] ?? '';
        $msg_body = $this->param['msg_body'] ?? '';
        if (empty($queue_name) || empty($msg_body)) {
            return ajax('参数缺失', 101);
        }
        $cmq = new CMQ();
        $res = $cmq->sendMessage($queue_name, $msg_body);
        if (!empty($res->msgId)) {
            return ajax('发送成功', 200, $res);
        } else {
            return ajax('发送失败', 400, $res);
        }
    }

    public function batchSendMessage()
    {
        $queue_name = $this->param['queue_name'] ?? '';
        $msg_list = $this->param['msg_list'] ?? [];
        if (empty($queue_name) || empty($msg_list)) {
            return ajax('参数缺失', 101);
        }
        $cmq = new CMQ();
        $res = $cmq->batchSendMessage($queue_name, $msg_list);
        if (is_array($res)) {
            return ajax('发送成功', 200, $res);
        } else {
            return ajax('发送失败', 400, $res);
        }
    }

    public function receiveMessage()
    {
        $queue_name = $this->param['queue_name'] ?? '';
        if (empty($queue_name)) {
            return ajax('参数缺失', 101);
        }
        $cmq = new CMQ();
        $res = $cmq->receiveMessage($queue_name);
        if (!empty($res->msgId)) {
            return ajax('消费成功', 200, $res);
        } else {
            return ajax('消费失败', 400, $res);
        }
    }

    public function batchReceiveMessage()
    {
        $queue_name = $this->param['queue_name'] ?? '';
        if (empty($queue_name)) {
            return ajax('参数缺失', 101);
        }
        $num_of_msg = $this->param['num_of_msg'] ?? 1;
        $cmq = new CMQ();
        $res = $cmq->batchReceiveMessage($queue_name, $num_of_msg);
        if (is_array($res)) {
            return ajax('消费成功', 200, $res);
        } else {
            return ajax('消费失败', 400, $res);
        }
    }

    public function deleteMessage()
    {
        $param = $this->param;
        if (empty($param['queue_name']) || empty($param['receipt_handle'])) {
            return ajax('参数缺失', 101);
        }
        $cmq = new CMQ();
        $res = $cmq->deleteMessage($param['queue_name'], $param['receipt_handle']);
        if ($res) {
            return ajax('删除成功', 200, $res);
        } else {
            return ajax('删除失败', 400, $res);
        }
    }

    public function publishMessage()
    {
        $param = $this->param;
        if (empty($param['topic_name']) || empty($param['message'])) {
            return ajax('参数缺失', 101);
        }
        $cmq = new CMQ();
        $res = $cmq->publishMessage($param);
        if (!empty($res['requestId']) && !empty($res['msgId'])) {
            return ajax('发布成功', 200, $res);
        } else {
            return ajax('发布失败', 400, $res);
        }
    }

    public function batchPublishMessage()
    {
        $param = $this->param;
        if (empty($param['topic_name']) || empty($param['message_list']) || !is_array($param['message_list'])) {
            return ajax('非法参数', 101);
        }
        $cmq = new CMQ();
        $res = $cmq->batchPublishMessage($param);
        if (!empty($res['requestId']) && !empty($res['msgList'])) {
            return ajax('发布成功', 200, $res);
        } else {
            return ajax('发布失败', 400, $res);
        }
    }

    public function receiveTopicMessage()
    {
        if (empty($this->param['queue_name'])) {
            return ajax('参数缺失', 101);
        }
        $cmq = new CMQ();
        $res = $cmq->receiveTopicMessage($this->param);
        if (!empty($res->msgId)) {
            return ajax('消费成功', 200, $res);
        } else {
            return ajax('消费失败', 400, $res);
        }
    }

    public function batchReceiveTopicMessage()
    {
        if (empty($this->param['queue_name']) || empty($this->param['num_of_msg'])) {
            return ajax('参数缺失', 101);
        }
        $queue_name = $this->param['queue_name'];
        $num_of_msg = intval($this->param['num_of_msg']);
        $cmq = new CMQ();
        $res = $cmq->batchReceiveTopicMessage($queue_name, $num_of_msg);
        if (is_array($res)) {
            return ajax('消费成功', 200, $res);
        } else {
            return ajax('消费失败', 400, $res);
        }
    }
    //endregion
    //region Pulsar 发送、接收消息
    public function sendPulsarMessage()
    {
        $param = $this->param;
        if (empty($param['topic']) || empty($param['payload'])) {
            return ajax('参数缺失', 101);
        }
        $pulsar = new PulsarMQ();
        $res = $pulsar->sendMessage($param);
        if (!empty($res->RequestId)) {
            return ajax('发送成功', 200, $res);
        } else {
            return ajax('发送失败', 400, $res);
        }
    }
    //endregion
    //region RoyMQ
    /**
     * 创建主题
     * @author 贺强
     * @time   2022/6/24 13:59
     */
    public function addTopic()
    {
        $param = $this->param;
        if (empty($param['topic_name'])) {
            return ajax('参数缺失', 101);
        }
        $res = Roymq::addTopic($param);
        if (!empty($res->Id)) {
            return ajax('创建成功', 200, $res);
        } else {
            return ajax('创建失败', 400, $res);
        }
    }

    /**
     * 创建队列
     * @author 贺强
     * @time   2022/6/28 9:45
     */
    public function addQueue()
    {
        $param = $this->param;
        if (empty($param['queue_name'])) {
            return ajax('队列名称不能为空', 101);
        }
        $res = Roymq::addQueue($param);
        if (!empty($res->Id)) {
            return ajax('创建成功', 200, $res);
        } else {
            return ajax('创建失败', 400, $res);
        }
    }

    /**
     * 发送消息
     * @author 贺强
     * @time   2022/6/24 15:11
     */
    public function sendMsg()
    {
        $param = $this->param;
        if (empty($param['msg_body'])) {
            return ajax('参数缺失', 101);
        }
        $res = Roymq::sendMsg($param);
        if (!empty($res->Id)) {
            return ajax('发送成功', 200, $res);
        } else {
            return ajax('发送失败', 400, $res);
        }
    }

    /**
     * 接收消息
     * @author 贺强
     * @time   2022/6/27 8:50
     */
    public function receive()
    {
        $queue = $this->param['queue_name'] ?? '';
        if (empty($queue)) {
            return ajax('队列名称不能为空', 101);
        }
        if (!empty($this->param['topic_name'])) {
            $queue = $queue . '@' . $this->param['topic_name'];
        }
        $queue = md5($queue);
        $redis = new Redis();
        $msg = $redis->get($queue);
        if (!empty($msg)) {
            $msg = json_decode($msg, true);
            foreach ($msg as &$m) {
                $m['receive_time'] = date('Y-m-d H:i:s');
                if (!empty($m['id'])) {
                    // RoymqMessageModel::modify(['receive_time' => time()], ['id' => $m['id']]);
                    $param = ['receive_time' => time(), 'id' => $m['id']];
                    queue('app\\common\\job\\MessageJob@receive', $param);
                }
            }
            $msg = key2hump($msg);
            $redis->delete($queue);
            return ajax('成功', 200, $msg);
        } else {
            return ajax('暂无消息', 400);
            // $model = new RoymqMessageModel();
            // $where = ['receive_time' => 0, 'queue_name' => $this->param['queue_name']];
            // if (!empty($this->param['topic'])) {
            //     $where['topic_name'] = $this->param['topic_name'];
            // }
            // $msg = $model->getList($where, ['id', 'msg_body', 'queue_name', 'topic_name', 'ctime']);
            // if (empty($msg)) {
            //     return '';
            // }
            // foreach ($msg as &$m) {
            //     RoymqMessageModel::modify(['receive_time' => time()], ['id' => $m['id']]);
            // }
            // $msg = key2hump($msg);
            // return ajax('成功', 200, $msg);
        }
    }
    //endregion
}
